跳到主要内容

MapReduce 作业运行机制 ⭐️

通过调用job对象的submit()方法就可以运行MapReduce作业,在这个方法中封装处理了很多作业在运行过程中的细节问题,下面我们学习其是如何实现的。作业的整个过程涉及下面5个组件的合作运行:

  1. 客户端,提交 MapReduce 作业。

  2. YARN 资源管理器,负责协调集群上计算机资源的分配。

  3. YARN 节点管理器(nodemanager),负责启动和监视集群中机器上的计算容器(container)。

  4. MapReduce 的application master,  负责协调运行MapReduce作业的任务。它和 MapReduce 任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理。

  5. 分布式文件系统(这里特指HDFS),用来存储作业相关的依赖文件以在集群中共享。

1.作业的提交

在Job的submit()方法中,会创建一个内部的JobSubmitter实例,并调用其submitJobInternal()方法来提交作业(对应上图中步骤1)。

提交作业后,waitForCompletion()方法会每秒轮询作业的进度。如果发现自上次报告后进度有改变,它会将进度报告显示在控制台上。当作业完成后,如果成功,会显示作业的计数器;如果失败,则会将导致作业失败的错误记录在控制台上。

作业提交过程:

  1. 向资源管理器请求一个新的应用ID,用于MapReduce作业的ID(对应上图中的步骤2)。

  2. 检查作业的输出说明。例如,如果没有指定输出目录或输出目录已经存在,作业将不会被提交,并将错误返回给MapReduce程序。

  3. 计算作业的输入分片。如果无法计算分片,例如因为输入路径不存在,作业将不会被提交,并将错误返回给MapReduce程序。

  4. 将运行作业所需的资源(包括作业JAR文件、配置文件和计算得到的输入分片)复制到以作业ID命名的共享文件系统目录下(对应上图中的步骤3)。

作业JAR文件的副本数量较多(由mapreduce.client.submit.file.replication属性控制,默认值为10),因此在运行任务时,节点管理器可以从多个副本中访问资源。

  1. 通过调用资源管理器的submitApplication()方法提交作业(对应上图中的步骤4)。

2.作业的初始化

在提交一个MapReduce作业时,资源管理器(ResourceManager)收到调用它的submitApplication()消息后,会将请求传递给YARN调度器(scheduler)。调度器会分配一个容器,并在节点管理器(NodeManager)的管理下,在容器中启动应用程序的application master进程(对应上图中5a,5b)。

MapReduce作业的application master是一个Java应用程序,主类是MRAppMaster。它通过创建多个簿记对象来初始化作业,以跟踪作业的进度(对应上图中的步骤6)。

接下来,它接受来自共享文件系统的输入分片,然后为每个分片创建一个map任务对象和多个reduce任务对象(数量由作业的setNumReduceTasks()方法设置)。此时会分配任务ID(对应上图中的步骤7)。

application master需要决定如何运行组成MapReduce作业的各个任务。如果作业很小,它会选择在同一个JVM上运行任务。当application master判断在新的容器中分配和运行任务的开销大于顺序运行这些任务的开销时,就会选择在同一个JVM上运行任务。这种作业被称为uberized,或者称为uber任务

小作业指的是mapper数量少于10且reducer数量为1且输入大小小于一个HDFS块的作业。可以通过调整下面配置参数来改变这些阈值。

  • job.ubertask.maxmaps

  • job.ubertask.maxreduces

  • job.ubertask.maxbytes

要启用uber任务,可以将mapreduce.job.ubertask.enable设置为true。

最后,在运行任何任务之前,application master会调用setupJob()方法来设置OutputCommitter。默认情况下,使用FileOutputCommitter,它负责设置作业的最终输出目录和任务的临时工作空间。

3.任务的分配

如果一个作业不适合作为uber任务运行,那么application master会为该作业中的所有map任务和reduce任务向资源管理器请求容器。首先会发出对map任务的请求,这个请求的优先级比reduce任务的请求要高,因为所有的map任务必须在reduce排序阶段启动之前完成。只有当有5%的map任务已经完成时,才会发出reduce任务的请求(对应上图中的步骤8)。

reduce任务可以在集群中的任意位置运行,但是map任务的请求受到数据本地化的限制,这也是调度器关注的。在理想情况下,任务是数据本地化的,意味着任务在分片所在的同一节点上运行。另一种情况是任务可能是机架本地化的,即任务和分片在同一机架但不是同一节点上运行。还有一些任务既不是数据本地化也不是机架本地化,它们需要从运行所在机架以外的机架获取数据。对于一个特定的作业运行,可以通过查看作业的计数器来确定在每个本地化层次上运行的任务的数量。

请求还指定了任务的内存需求和CPU数。默认情况下,每个map任务和reduce任务分配1024MB的内存和一个虚拟内核。这些值可以在每个作业的基础设置以下四个属性,这些值受到最大值和最小值的限制。

  • map.memory.mb

  • reduce.memory.mb

  • map.cpu.vcores

  • reduce.cpu.vcores

4.任务的执行

一旦资源管理器的调度器为任务分配了一个特定节点上的容器,application master通过与节点管理器通信来启动容器(对应上图中9a,9b)。

任务是通过一个名为YarnChild的Java应用程序执行的。在运行任务之前,首先需要将任务所需的资源本地化,包括作业的配置、JAR文件和来自分布式缓存的所有文件(对应上图中步骤10)。

最后,执行map任务或reduce任务(对应上图中的步骤11)。

5.进度和状态的更新

在MapReduce作业中,作业和每个任务都有一个状态(status),包括作业或任务的状态(例如运行中、成功完成、失败)、map和reduce的进度、作业计数器的值以及状态消息或描述(可以由用户代码设置)。这些状态信息在作业执行期间会不断改变,并通过与客户端的通信进行传递。

任务在运行时会跟踪其进度(progress),即任务完成的百分比。对于map任务,进度是已处理输入所占的比例。对于reduce任务,进度稍微复杂一些,但系统仍然可以估计已处理的reduce输入的比例。整个过程分为三个阶段,与shuffle的三个阶段相对应。例如,如果一个任务已经处理了reducer输入的一半,那么任务的进度就是5/6,这是因为已经完成了复制和排序阶段(每个占1/3),并且已经完成了reduce阶段的一半(1/6)。

任务还具有一组计数器,用于计数任务运行过程中的各种事件。这些计数器可以是框架内置的,例如已写入的map输出记录数,也可以是用户自定义的。

当map任务或reduce任务运行时,子进程和它们的父application master通过umbilical接口进行通信。每隔3秒,任务通过这个接口向自己的application master报告进度和状态(包括计数器)。application master会汇总这些报告,形成作业的汇总视图。

Yarn Web界面显示了所有正在运行的应用程序,并提供链接到各个应用程序的application master界面。这些界面展示了MapReduce作业的更多细节,包括作业的进度。

在作业执行期间,客户端可以每秒轮询一次application master,以获取最新的状态信息。轮询间隔可以通过设置mapreduce.client.progressmonitor.pollinterval或使用Job的getStatus()方法来调整。

6.作业的完成

当Application Master接收到作业的最后一个任务完成的通知后,它会将作业的状态设置为“成功”。这意味着整个作业已经成功完成。

在Job轮询状态时,Job会知道作业已经成功完成,并打印一条消息告知用户。此时,Job也会将统计信息和计数值输出到控制台,以便用户查看。

如果Application Master进行了相应的设置,它还会发送一个HTTP作业通知。客户端可以通过设置mapreduce.job.end-notification.url属性来接收回调指令。

最后,当作业完成时,Application Master和任务容器会清理它们的工作状态,这将导致中间输出被删除。同时,OutputCommitter的commitJob()方法会被调用。作业的信息会被作业历史服务器存档,以便用户在需要时可以查询。